#!/bin/bash

# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.


# Entrypoint script for yarn container.

docker_name="${FRAMEWORK_NAME//\~/-}-$CONTAINER_ID"
export PAI_DEFAULT_FS_URI={{{ hdfsUri }}}

exec 13>$LAUNCHER_LOG_DIR/YarnContainerDebug.log
BASH_XTRACEFD=13

function debug_log()
{
  if [ $# -gt 1 ]; then
    local prefix=$1
    local message=$2
    echo "[DEBUG] $prefix: $message"
  fi
}

function exit_handler()
{
  rc=$?
  local handler="Yarn container exit handler"
  debug_log "$handler"  "EXIT signal received in yarn container, performing clean up action..."

  debug_log "$handler" "write exit code to file"
  debug_log "$handler" "yarn container exit code: $rc"
  debug_log "$handler" "exit code file path: /var/lib/hadoopdata/nm-local-dir/nmPrivate/$APP_ID/$CONTAINER_ID/$CONTAINER_ID.pid.exitcode"
  echo $rc > "/var/lib/hadoopdata/nm-local-dir/nmPrivate/$APP_ID/$CONTAINER_ID/$CONTAINER_ID.pid.exitcode"

  debug_log "$handler" "clean the container code"
  rm -fr tmp/pai-root/code 2>/dev/null

  exit $rc
}

set -x
PS4="+[\t] "
trap exit_handler EXIT


mkdir -p "/tmp/pai-root/alive/$APP_ID"
while /bin/true; do
  touch "/tmp/pai-root/alive/$APP_ID/yarn_$CONTAINER_ID"
  sleep 20
  [ -f "/tmp/pai-root/alive/$APP_ID/docker_$CONTAINER_ID" ] && [ ! "$(docker ps | grep $docker_name)" ] && break
done &

gpu_id=''
nvidia_devices=''
{{# taskData.gpuNumber }}
nvidia_devices+='--device=/dev/nvidiactl --device=/dev/nvidia-uvm'
gpu_bitmap=$(perl -e 'printf "%b",'$CONTAINER_GPUS)
for (( i=0,j=$((${#gpu_bitmap}-1)); i<${#gpu_bitmap}; i++,j-- )); do
  if [ ${gpu_bitmap:$i:1} -gt 0 ]; then
    gpu_id+="$j,"
    nvidia_devices+=" --device=/dev/nvidia$j"
  fi
done
gpu_id=$(echo $gpu_id | sed "s/,$//g")
{{/ taskData.gpuNumber }}
printf "%s %s\n%s\n\n" "[INFO]" "GPU_ID" "$gpu_id"
printf "%s %s\n%s\n\n" "[INFO]" "NVIDIA_DEVICES" "$nvidia_devices"


{{# jobData.authFile }}
IFS=$'\r\n' GLOBIGNORE='*' \
  eval 'cred=($(hdfs dfs -cat {{{ jobData.authFile }}}))' \
  || { echo 'Read authFile failed' ; exit 1; }
docker login --username ${cred[1]} --password ${cred[2]} ${cred[0]} \
  || { echo 'Authorized failed' ; exit 1; }
{{/ jobData.authFile }}

ssh_config_file="tmp/pai-root/ssh_config"
mkdir -p ${ssh_config_file}
# Get container host list from framework launcher webservice
function get_host_list()
{
  delay_time=20
  info_source="webhdfs"
  while true; do
    if [[ "$info_source" = "webhdfs" ]]; then
      taskrole_statuses=$(curl -L -H "Accept: application/json" {{{ frameworkInfoWebhdfsUri }}} | jq -r ".aggregatedFrameworkStatus.aggregatedTaskRoleStatuses")
    else
      taskrole_statuses=$(curl -sS -X GET -H "Accept: application/json" {{{ aggregatedStatusUri }}} | jq -r ".aggregatedTaskRoleStatuses")
    fi
    jq -re ".[].taskStatuses.taskStatusArray[].containerIp" <<< $(echo $taskrole_statuses) > /dev/null
    if [ $? -eq 0 ]; then
      null_line=$(jq -r ".[].taskStatuses.taskStatusArray[].containerIp" <<< $(echo $taskrole_statuses) | grep -Ecv "([0-9]{1,3}.?){4}")
      if [ $null_line -eq 0 ]; then
        break
      fi
    fi
    printf "%s %s\n" "[WARNING]" "Failed to request info from $info_source, sleep $delay_time s ..."
    sleep ${delay_time}s
    if [[ $delay_time -ge 300 ]]; then
      if [[ "$info_source" = "webhdfs" ]]; then
        delay_time=60
        info_source="restapi"
      else
        exit 205
      fi
    else
      if [[ "$info_source" = "webhdfs" ]]; then
        delay_time=$((delay_time + 20))
      else
        delay_time=$((delay_time + 60))
      fi
    fi
  done
  for taskrole in $(echo $taskrole_statuses |  jq -r ".|keys[]"); do
    host_list=''
    while read container_ip && read container_ports && read task_index; do
      host_list+=${container_ip}:$(echo $container_ports | grep -o "http:[^,;]*" | cut -f 2 -d":"),
      ssh_port=$(echo $container_ports | grep -o "ssh:[^,;]*" | cut -f 2 -d":")
      printf "%s\n  %s\n  %s\n  %s\n  %s\n  %s\n  %s\n" \
        "Host ${taskrole}-${task_index}" \
        "HostName ${container_ip}" \
        "Port ${ssh_port}" \
        "User root" \
        "StrictHostKeyChecking no" \
        "UserKnownHostsFile /dev/null" \
        "IdentityFile /root/.ssh/{{{ jobData.jobName }}}" >> ${ssh_config_file}/config
    done < <(echo $taskrole_statuses | jq -r ".${taskrole}.taskStatuses.taskStatusArray[] | .containerIp, .containerPorts, .taskIndex")
    host_list=$(echo $host_list | sed "s/,$//g")
    echo PAI_TASK_ROLE_${taskrole}_HOST_LIST=$host_list >> $1
  done
}

# Prepare env file
ENV_LIST=env.list

cat <<ENV >> $ENV_LIST
APP_ID=$APP_ID
FRAMEWORK_NAME=$FRAMEWORK_NAME
HADOOP_USER_NAME=$HADOOP_USER_NAME
PAI_TASK_INDEX=$TASK_INDEX
PAI_CONTAINER_HOST_IP=$CONTAINER_IP
PAI_CONTAINER_HOST_PORT_LIST=$CONTAINER_PORTS
PAI_CONTAINER_ID=$CONTAINER_ID
PAI_DEFAULT_FS_URI={{{ hdfsUri }}}
PAI_JOB_NAME={{{ jobData.jobName }}}
PAI_USER_NAME={{{ jobData.userName }}}
PAI_DATA_DIR={{{ jobData.dataDir }}}
PAI_OUTPUT_DIR={{{ jobData.outputDir }}}
PAI_CODE_DIR={{{ jobData.codeDir }}}
PAI_CURRENT_TASK_ROLE_NAME={{{ taskData.name }}}
PAI_CURRENT_TASK_ROLE_TASK_COUNT={{{ taskData.taskNumber }}}
PAI_CURRENT_TASK_ROLE_CPU_COUNT={{{ taskData.cpuNumber }}}
PAI_CURRENT_TASK_ROLE_MEM_MB={{{ taskData.memoryMB }}}
PAI_CURRENT_TASK_ROLE_SHM_MB={{{ taskData.shmMB }}}
PAI_CURRENT_TASK_ROLE_GPU_COUNT={{{ taskData.gpuNumber }}}
PAI_CURRENT_TASK_ROLE_MIN_FAILED_TASK_COUNT={{{ taskData.minFailedTaskCount }}}
PAI_CURRENT_TASK_ROLE_MIN_SUCCEEDED_TASK_COUNT={{{ taskData.minSucceededTaskCount }}}
PAI_CURRENT_TASK_ROLE_CURRENT_TASK_INDEX=$TASK_INDEX
PAI_JOB_TASK_COUNT={{{ tasksNumber }}}
PAI_JOB_TASK_ROLE_COUNT={{{ taskRolesNumber }}}
PAI_JOB_TASK_ROLE_LIST={{{ taskRoleList }}}
ENV

port_list=(${CONTAINER_PORTS//;/ })
for ports in "${port_list[@]}"; do
  port_label="$(cut -f 1 -d":" <<< $ports)"
  export PAI_CONTAINER_HOST_${port_label}_PORT_LIST="$(cut -f 2 -d":" <<< $ports)"
  echo PAI_CONTAINER_HOST_${port_label}_PORT_LIST="$(cut -f 2 -d":" <<< $ports)" >> $ENV_LIST
done

echo PAI_CONTAINER_HOST_PORT="$(cut -f 1 -d"," <<< $PAI_CONTAINER_HOST_http_PORT_LIST)" >> $ENV_LIST
echo PAI_CONTAINER_SSH_PORT="$(cut -f 1 -d"," <<< $PAI_CONTAINER_HOST_ssh_PORT_LIST)" >> $ENV_LIST

get_host_list $ENV_LIST

# Backward compatibility
cat <<ENV >> $ENV_LIST
# Backward compatibility
PAI_USERNAME={{{ jobData.userName }}}
PAI_TASK_ROLE_NAME={{{ taskData.name }}}
PAI_TASK_ROLE_NUM={{{ taskData.taskNumber }}}
PAI_TASK_CPU_NUM={{{ taskData.cpuNumber }}}
PAI_TASK_MEM_MB={{{ taskData.memoryMB }}}
PAI_TASK_GPU_NUM={{{ taskData.gpuNumber }}}
PAI_TASK_ROLE_INDEX=$TASK_INDEX
PAI_TASKS_NUM={{{ tasksNumber }}}
PAI_TASK_ROLES_NUM={{{ taskRolesNumber }}}
PAI_TASK_ROLE_LIST={{{ taskRoleList }}}
PAI_CURRENT_CONTAINER_IP=$CONTAINER_IP
ENV

echo PAI_CURRENT_CONTAINER_PORT="$(cut -f 1 -d"," <<< $PAI_CONTAINER_HOST_http_PORT_LIST)" >> $ENV_LIST

export $(grep -v '^#' $ENV_LIST | xargs)
# put it after the 'export', so we won't source it into yarn container.
# we want job environment parameters to be available only in job contianer.
{{# jobEnvs }}
cat <<ENV >> $ENV_LIST
{{{ jobEnvs }}}
ENV
{{/ jobEnvs }}

# Prepare docker bootstrap script
bootstrap_dir="tmp/pai-root/bootstrap"
mkdir -p $bootstrap_dir
hdfs dfs -get {{{ hdfsUri }}}/Container/$HADOOP_USER_NAME/$FRAMEWORK_NAME/DockerContainerScripts/{{{ idx }}}.sh $bootstrap_dir/docker_bootstrap.sh \
  || { echo "Can not get script from HDFS. HDFS may be down."; exit 1; }

# Prepare user code
code_dir="tmp/pai-root/code"
mkdir -p $code_dir
if [[ -n $PAI_CODE_DIR ]]; then
  hdfs dfs -stat $PAI_CODE_DIR \
    || { echo "Can not stat $PAI_CODE_DIR"; exit 1; }
  code_dir_size="$(hdfs dfs -du -s $PAI_CODE_DIR | cut -d ' ' -f 1)"
  printf "%s %s\n" "[INFO]" "User code_dir_size is ${code_dir_size}"
  # TODO: Make the threshold configurable
  if [ $code_dir_size -gt 0 ] && [ $code_dir_size -le 200000000 ]; then
    hdfs dfs -get $PAI_CODE_DIR $code_dir \
      || { echo "Can not get code from HDFS. HDFS may be down."; exit 1; }
  else
    echo "$PAI_CODE_DIR is larger than 200MB, exit."
    exit 1
  fi
fi

# Prepare docker debug log
mkdir -p "/tmp/pai-root/log/$APP_ID/$CONTAINER_ID"
ln -s /tmp/pai-root/log/$APP_ID/$CONTAINER_ID/DockerContainerDebug.log $LAUNCHER_LOG_DIR/DockerContainerDebug.log

# retrieve the yarn local dir
hadoop_tmp_dir="/var/lib/hadoopdata"
container_id=$(cat /proc/self/cgroup | grep "memory" | awk -F '/' '{print $NF}')
mounted_path=$(docker inspect $container_id |\
  jq -r --arg hadoop_tmp_dir "$hadoop_tmp_dir" '.[] | .Mounts | .[] | select(.Destination==$hadoop_tmp_dir) | .Source')
container_local_dir=$mounted_path/nm-local-dir/usercache/{{{ jobData.userName }}}/appcache/$APP_ID/$CONTAINER_ID

# Pull docker image and run
docker pull {{{ jobData.image }}} \
  || { echo "Can not pull Docker image"; exit 1; }

## ATTENTION: do not specify `--pid=host` when run job, otherwise job-exporter can not get right
## network consumption
docker run --name $docker_name \
  --init \
  --rm \
  --tty \
  --privileged=false \
  --oom-score-adj=1000 \
  --cap-add=SYS_ADMIN \
  --network=host \
  --cpus={{{ taskData.cpuNumber }}} \
  --memory={{{ taskData.memoryMB }}}m \
  --shm-size={{{ taskData.shmMB }}}m \
  $nvidia_devices \
  --device=/dev/fuse \
  --security-opt apparmor:unconfined \
  --volume /tmp/pai-root/alive/$APP_ID:/alive \
  --volume /tmp/pai-root/alive/$APP_ID/yarn_$CONTAINER_ID:/alive/yarn_$CONTAINER_ID:ro \
  --volume /tmp/pai-root/log/$APP_ID/$CONTAINER_ID:/pai/log \
  --volume $container_local_dir/$bootstrap_dir:/pai/bootstrap:ro \
  --volume $container_local_dir/$code_dir:/pai/code:ro \
  --volume /var/drivers/nvidia/current:/usr/local/nvidia:ro \
  --volume /etc/hadoop-configuration-for-jobs:/etc/hadoop:ro \
  --volume $container_local_dir/$ssh_config_file:/pai/ssh_config/ \
  --label GPU_ID=$gpu_id \
  --label PAI_HOSTNAME="$(hostname)" \
  --label PAI_JOB_NAME={{{ jobData.jobName }}} \
  --label PAI_JOB_VC_NAME={{{ jobData.virtualCluster }}} \
  --label PAI_USER_NAME={{{ jobData.userName }}} \
  --label PAI_CURRENT_TASK_ROLE_NAME={{{ taskData.name }}} \
  --env-file $ENV_LIST \
   {{{ jobData.image }}} \
  /bin/bash '/pai/bootstrap/docker_bootstrap.sh'

